package com.lelebd.szt.source;

import com.alibaba.fastjson.JSONObject;
import com.lelebd.szt.config.SZTConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class SZTSource implements SourceFunction<JSONObject> {
    public SZTSource() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.198.108.148:9092");
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("kerberos.domain.name", "hadoop.hadoop.com");
        properties.put("group.id", "console-consumer-8102");
        properties.put("auto.commit.interval.ms", "6000000");
        properties.put("sasl.kerberos.service.name", "kafka");
        properties.put("client.id", "spt");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //Config.loadKafkaConsumerProperties()
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(SZTConfig.TOPIC, new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();
        myConsumer.setStartFromGroupOffsets();
    }

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {

    }

    @Override
    public void cancel() {

    }
}
